AMQP Resources Chaos Engineering
rabbit_common has a module called delegate
that does the module calling in a safe way - abstracting having to call the gen_server.
But how do we fetch the Erlang PID to call delegate:invoke
?
rabbit uses pg
- process_groups to group a bunch of processes by module.
To fetch channels we can use pg_local:get_members(rabbit_channels)
and it should return something like this.
(rabbit@MacBook-Pro-de-Lucas)31> pg_local:get_members(rabbit_channels).
[<0.1322.0>,<0.1336.0>]
A simple snippet to gracefully force shutdown a channel in Erlang VM is:
Pid = hd(pg_local:get_members(rabbit_channels)).
delegate:invoke(Pid, {rabbit_channel, shutdown, []}).
%% one liner delegate:invoke(hd(pg_local:get_members(rabbit_channels)), {rabbit_channel, shutdown, []}).
Utility method to randomize channels shutdown by % of live channels
shutdown_random_channels(Percentage) when Percentage > 0, Percentage =< 100 ->
Channels = pg_local:get_members(rabbit_channels),
TotalCount = length(Channels),
ShutdownCount = ceil(TotalCount * (Percentage / 100)),
RandomChannels = lists:sublist(
lists:sort([{rand:uniform(), Chan} || Chan <- Channels]),
ShutdownCount
),
[delegate:invoke(Pid, {rabbit_channel, shutdown, []}) || {_, Pid} <- RandomChannels],
{ok, length(RandomChannels)};
shutdown_random_channels(_) ->
{error, "Percentage must be between 1 and 100"}.
To execute this remotely you can do something like:
rabbitmqctl eval 'Channels = pg_local:get_members(rabbit_channels), TotalCount = length(Channels), ShutdownCount = ceil(TotalCount * (25/100)), RandomChannels = lists:sublist(lists:sort([{rand:uniform(), Chan} || Chan <- Channels]), ShutdownCount), [delegate:invoke(Pid, {rabbit_channel, shutdown, []}) || {_, Pid} <- RandomChannels].'
The main problem seems to be that doing the above CLI it does not trigger a message from the Server->Client of Channel.Closed
How can we trigger it?
We can verify the Channel.Close never reaches the server by doing
const amqplib = require('amqplib');
const repl = require('repl');
async function createConnection() {
try {
// Create connection
const connection = await amqplib.connect('amqp://localhost');
// Create channel
const channel = await connection.createChannel();
// Handle channel closure
channel.on('close', () => {
console.log('Channel closed');
});
// Handle channel errors
channel.on('error', (err) => {
console.error('Channel error:', err);
});
return { connection, channel };
} catch (error) {
console.error('Error creating connection:', error);
throw error;
}
}
// Initialize connection and channel
let channelInstance;
createConnection().then(({ channel }) => {
channelInstance = channel;
console.log('RabbitMQ connection established');
});
// Start REPL server
const replServer = repl.start({
prompt: 'rabbitmq> '
});
// Expose publish function to REPL context
replServer.context.publish = async (queue, message) => {
if (!channelInstance) {
console.error('Channel not initialized');
return;
}
try {
await channelInstance.assertQueue(queue);
channelInstance.sendToQueue(queue, Buffer.from(message));
console.log(`Message sent to queue: ${queue}`);
} catch (error) {
console.error('Error publishing message:', error);
}
};
module.exports = createConnection;
A better way to do so is by using AMQP Protocol and forcing issues on it. If we declare a passive
queue within a Channel ( passive means running a declare won't create resources and fail if does not exist - with a Channel.Close
);
Call the method
rabbit_channel:do(Ch2, #'queue.declare'{ passive = true,
queue = "non-existent-queue" }),
Generate script.erl
and put in under the rabbit main folder
-module(script).
-compile(export_all).
-include("deps/rabbit_common/include/rabbit_framing.hrl").
-on_load(close/0).
close () ->
Channels = pg_local:get_members(rabbit_channels),
TotalCount = length(Channels),
ShutdownCount = ceil(TotalCount * (25/100)),
RandomChannels = lists:sublist(lists:sort([{rand:uniform(), Chan} || Chan <- Channels]), ShutdownCount),
[delegate:invoke(Pid, fun(P) -> rabbit_channel:do(P, #'queue.declare'{passive=true, queue = <<"non-existent-queue">>}) end) || {_, Pid} <- RandomChannels],
ok.
Then run:
expect -c 'spawn rabbitmq-diagnostics -n rabbit@MacBook-Pro-de-Lucas remote_shell; expect ">" { send "{ok, script} = c(\"script\").\r" }; expect "{ok,script}" { send "\x03" }; interact'
There were too many issues dealing with rabbitmq data structures, like importing .hrl
files so
I ended up creating a open-source plugin Rabbit Chaos API